/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.solr.cloud;

import static org.apache.solr.cloud.SolrCloudTestCase.configurePrsDefault;

import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.tests.mockfile.FilterPath;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.solr.SolrTestCaseJ4.SuppressSSL;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.SolrQuery;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.handler.BackupStatusChecker;
import org.apache.solr.handler.ReplicationHandler;
import org.junit.BeforeClass;
import org.junit.Test;

/**
 * This test simply does a bunch of basic things in solrcloud mode and asserts things work as
 * expected.
 */
// Backups do checksum validation against a footer value not present in 'SimpleText'
@LuceneTestCase.SuppressCodecs({"SimpleText"})
@SuppressSSL(bugUrl = "https://issues.apache.org/jira/browse/SOLR-5776")
public class BasicDistributedZk2Test extends AbstractFullDistribZkTestBase {
  private static final String SHARD2 = "shard2";
  private static final String SHARD1 = "shard1";
  private static final String ONE_NODE_COLLECTION = "onenodecollection";
  private final boolean onlyLeaderIndexes = random().nextBoolean();

  public BasicDistributedZk2Test() {
    super();
    // we need DVs on point fields to compute stats & facets
    if (Boolean.getBoolean(NUMERIC_POINTS_SYSPROP))
      System.setProperty(NUMERIC_DOCVALUES_SYSPROP, "true");

    sliceCount = 2;
  }

  @Override
  protected boolean useTlogReplicas() {
    return false; // TODO: tlog replicas makes commits take way to long due to what is likely a bug
    // and it's TestInjection use
  }

  @BeforeClass
  public static void _setPrsDefault() {
    configurePrsDefault();
  }

  @Test
  @ShardsFixed(num = 4)
  public void test() throws Exception {
    boolean testFinished = false;
    try {
      handle.clear();
      handle.put("timestamp", SKIPVAL);

      testNodeWithoutCollectionForwarding();

      indexr(
          id,
          1,
          i1,
          100,
          tlong,
          100,
          t1,
          "now is the time for all good men",
          "foo_f",
          1.414f,
          "foo_b",
          "true",
          "foo_d",
          1.414d);

      commit();

      // make sure we are in a steady state...
      waitForRecoveriesToFinish(false);

      assertDocCounts(false);

      indexAbunchOfDocs();

      // check again
      waitForRecoveriesToFinish(false);

      commit();

      assertDocCounts(VERBOSE);
      checkQueries();

      assertDocCounts(VERBOSE);

      query("q", "*:*", "sort", "n_tl1 desc");

      bringDownShardIndexSomeDocsAndRecover();

      query("q", "*:*", "sort", "n_tl1 desc");

      // test adding another replica to a shard - it should do a
      // recovery/replication to pick up the index from the leader
      addNewReplica();

      long docId = testUpdateAndDelete();

      // index a bad doc...
      expectThrows(SolrException.class, () -> indexr(t1, "a doc with no id"));

      // TODO: bring this to its own method?
      // try indexing to a leader that has no replicas up
      ZkStateReader zkStateReader = ZkStateReader.from(cloudClient);
      ZkNodeProps leaderProps = zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, SHARD2);

      String nodeName = leaderProps.getStr(ZkStateReader.NODE_NAME_PROP);
      chaosMonkey.stopShardExcept(SHARD2, nodeName);

      SolrClient client = getClient(nodeName);

      index_specific(client, "id", docId + 1, t1, "what happens here?");

      // expire a session...
      CloudJettyRunner cloudJetty = shardToJetty.get(SHARD1).get(0);
      chaosMonkey.expireSession(cloudJetty.jetty);
      // Wait until the jetty is reconnected, otherwise the following index command could fail
      cloudJetty
          .jetty
          .getCoreContainer()
          .getZkController()
          .getZkClient()
          .getCuratorFramework()
          .blockUntilConnected(50, TimeUnit.MILLISECONDS);

      indexr("id", docId + 1, t1, "slip this doc in");

      waitForRecoveriesToFinish(false);

      checkShardConsistency(SHARD1);
      checkShardConsistency(SHARD2);

      testFinished = true;
    } finally {
      if (!testFinished) {
        printLayoutOnTearDown = true;
      }
    }
  }

  private void testNodeWithoutCollectionForwarding() throws Exception {
    assertEquals(
        0,
        CollectionAdminRequest.createCollection(ONE_NODE_COLLECTION, "conf1", 1, 1)
            .setCreateNodeSet("")
            .process(cloudClient)
            .getStatus());
    assertTrue(
        CollectionAdminRequest.addReplicaToShard(ONE_NODE_COLLECTION, "shard1")
            .setCoreName(ONE_NODE_COLLECTION + "core")
            .process(cloudClient)
            .isSuccess());

    waitForCollection(ZkStateReader.from(cloudClient), ONE_NODE_COLLECTION, 1);
    waitForRecoveriesToFinish(ONE_NODE_COLLECTION, ZkStateReader.from(cloudClient), false);

    ZkStateReader.from(cloudClient).getLeaderRetry(ONE_NODE_COLLECTION, SHARD1, 30000);

    int docs = 2;
    for (JettySolrRunner jetty : jettys) {
      final String clientUrl = getBaseUrl(jetty);
      addAndQueryDocs(clientUrl, docs);
      docs += 2;
    }
  }

  // 2 docs added every call
  private void addAndQueryDocs(final String baseUrl, int docs) throws Exception {

    SolrQuery query = new SolrQuery("*:*");

    try (SolrClient client = getHttpSolrClient(baseUrl, "onenodecollection")) {
      // add a doc
      client.add(sdoc("id", docs));
      client.commit();

      QueryResponse results = client.query(query);
      assertEquals(docs - 1, results.getResults().getNumFound());

      SolrInputDocument doc = new SolrInputDocument();
      doc.addField("id", docs + 1);
      client.add(doc);
      client.commit();

      query = new SolrQuery("*:*");
      query.set("rows", 0);
      results = client.query(query);
      assertEquals(docs, results.getResults().getNumFound());
    }
  }

  private long testUpdateAndDelete() throws Exception {
    long docId = 99999999L;
    indexr("id", docId, t1, "originalcontent");

    commit();

    ModifiableSolrParams params = new ModifiableSolrParams();
    params.add("q", t1 + ":originalcontent");
    QueryResponse results = clients.get(0).query(params);
    assertEquals(1, results.getResults().getNumFound());

    // update doc
    indexr("id", docId, t1, "updatedcontent");

    commit();

    results = clients.get(0).query(params);
    assertEquals(0, results.getResults().getNumFound());

    params.set("q", t1 + ":updatedcontent");

    results = clients.get(0).query(params);
    assertEquals(1, results.getResults().getNumFound());

    UpdateRequest uReq = new UpdateRequest();
    // uReq.setParam(UpdateParams.UPDATE_CHAIN, DISTRIB_UPDATE_CHAIN);
    uReq.deleteById(Long.toString(docId)).process(clients.get(0));

    commit();

    results = clients.get(0).query(params);
    assertEquals(0, results.getResults().getNumFound());
    return docId;
  }

  private void bringDownShardIndexSomeDocsAndRecover() throws Exception {
    SolrQuery query = new SolrQuery("*:*");
    query.set("distrib", false);

    commit();

    long deadShardCount =
        shardToJetty.get(SHARD2).get(0).client.solrClient.query(query).getResults().getNumFound();

    query("q", "*:*", "sort", "n_tl1 desc");

    int oldLiveNodes =
        ZkStateReader.from(cloudClient)
            .getZkClient()
            .getChildren(ZkStateReader.LIVE_NODES_ZKNODE, null, true)
            .size();

    assertEquals(5, oldLiveNodes);

    // kill a shard
    CloudJettyRunner deadShard = chaosMonkey.stopShard(SHARD1, 0);

    // ensure shard is dead
    expectThrows(
        SolrServerException.class,
        "This server should be down and this update should have failed",
        () -> index_specific(deadShard.client.solrClient, id, 999, i1, 107, t1, "specific doc!"));

    commit();

    query("q", "*:*", "sort", "n_tl1 desc");

    // long cloudClientDocs = cloudClient.query(new
    // SolrQuery("*:*")).getResults().getNumFound();
    // System.out.println("clouddocs:" + cloudClientDocs);

    // try to index to a living shard at shard2

    long numFound1 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();

    ZkStateReader.from(cloudClient).getLeaderRetry(DEFAULT_COLLECTION, SHARD1, 60000);

    try {
      index_specific(
          shardToJetty.get(SHARD1).get(1).client.solrClient,
          id,
          1000,
          i1,
          108,
          t1,
          "specific doc!");
    } catch (Exception e) {
      // wait and try again
      Thread.sleep(4000);
      index_specific(
          shardToJetty.get(SHARD1).get(1).client.solrClient,
          id,
          1000,
          i1,
          108,
          t1,
          "specific doc!");
    }

    commit();

    checkShardConsistency(true, false);

    query("q", "*:*", "sort", "n_tl1 desc");

    long numFound2 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();

    assertEquals(numFound1 + 1, numFound2);

    SolrInputDocument doc = new SolrInputDocument();
    doc.addField("id", 1001);

    controlClient.add(doc);

    // try adding a doc with CloudSolrServer
    UpdateRequest ureq = new UpdateRequest();
    ureq.add(doc);
    // ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);

    try {
      ureq.process(cloudClient);
    } catch (SolrServerException e) {
      // try again
      Thread.sleep(3500);
      ureq.process(cloudClient);
    }

    commit();

    query("q", "*:*", "sort", "n_tl1 desc");

    long numFound3 = cloudClient.query(new SolrQuery("*:*")).getResults().getNumFound();

    // lets just check that the one doc since last commit made it in...
    assertEquals(numFound2 + 1, numFound3);

    // test debugging
    testDebugQueries();

    if (VERBOSE) {
      System.err.println(controlClient.query(new SolrQuery("*:*")).getResults().getNumFound());

      for (SolrClient client : clients) {
        try {
          SolrQuery q = new SolrQuery("*:*");
          q.set("distrib", false);
          System.err.println(client.query(q).getResults().getNumFound());
        } catch (Exception e) {

        }
      }
    }
    // TODO: This test currently fails because debug info is obtained only
    // on shards with matches.
    // query("q","matchesnothing","fl","*,score", "debugQuery", "true");

    // this should trigger a recovery phase on deadShard
    deadShard.jetty.start();

    // make sure we have published we are recovering
    Thread.sleep(1500);

    waitForRecoveriesToFinish(false);

    deadShardCount =
        shardToJetty.get(SHARD1).get(0).client.solrClient.query(query).getResults().getNumFound();
    // if we properly recovered, we should now have the couple missing docs that
    // came in while shard was down
    checkShardConsistency(true, false);

    // recover over 100 docs so we do more than just peer sync (replicate recovery)
    chaosMonkey.stopJetty(deadShard);

    for (int i = 0; i < 226; i++) {
      doc = new SolrInputDocument();
      doc.addField("id", 2000 + i);
      controlClient.add(doc);
      ureq = new UpdateRequest();
      ureq.add(doc);
      // ureq.setParam("update.chain", DISTRIB_UPDATE_CHAIN);
      ureq.process(cloudClient);
    }
    commit();

    Thread.sleep(1500);

    deadShard.jetty.start();

    // make sure we have published we are recovering
    Thread.sleep(1500);

    waitForThingsToLevelOut(1, TimeUnit.MINUTES);

    Thread.sleep(500);

    waitForRecoveriesToFinish(false);

    checkShardConsistency(true, false);

    // try a backup command
    try (final SolrClient client =
        getHttpSolrClient((String) shardToJetty.get(SHARD2).get(0).info.get("base_url"))) {
      final String backupName = "the_backup";
      ModifiableSolrParams params = new ModifiableSolrParams();
      params.set("qt", ReplicationHandler.PATH);
      params.set("command", "backup");
      params.set("name", backupName);
      final Path location = FilterPath.unwrap(createTempDir()).toRealPath();
      // Allow non-standard location outside SOLR_HOME
      jettys.forEach(j -> j.getCoreContainer().getAllowPaths().add(location));
      params.set("location", location.toString());

      QueryRequest request = new QueryRequest(params);
      client.request(request, DEFAULT_TEST_COLLECTION_NAME);

      final BackupStatusChecker backupStatus =
          new BackupStatusChecker(client, "/" + DEFAULT_TEST_COLLECTION_NAME + "/replication");
      final String backupDirName = backupStatus.waitForBackupSuccess(backupName, 30);
      assertTrue(
          "Backup dir does not exist: " + backupDirName,
          Files.exists(location.resolve(backupDirName)));
    }
  }

  private void addNewReplica() throws Exception {

    waitForRecoveriesToFinish(false);

    // new server should be part of first shard
    // how many docs are on the new shard?
    for (CloudJettyRunner cjetty : shardToJetty.get(SHARD1)) {
      if (VERBOSE)
        System.err.println(
            "shard1 total:"
                + cjetty.client.solrClient.query(new SolrQuery("*:*")).getResults().getNumFound());
    }
    for (CloudJettyRunner cjetty : shardToJetty.get(SHARD2)) {
      if (VERBOSE)
        System.err.println(
            "shard2 total:"
                + cjetty.client.solrClient.query(new SolrQuery("*:*")).getResults().getNumFound());
    }

    checkShardConsistency(SHARD1);
    checkShardConsistency(SHARD2);

    assertDocCounts(VERBOSE);
  }

  private void testDebugQueries() throws Exception {
    handle.put("explain", SKIPVAL);
    handle.put("debug", UNORDERED);
    handle.put("time", SKIPVAL);
    handle.put("track", SKIP);
    query("q", "now their fox sat had put", "fl", "*,score", CommonParams.DEBUG_QUERY, "true");
    query("q", "id_i1:[1 TO 5]", CommonParams.DEBUG_QUERY, "true");
    query("q", "id_i1:[1 TO 5]", CommonParams.DEBUG, CommonParams.TIMING);
    query("q", "id_i1:[1 TO 5]", CommonParams.DEBUG, CommonParams.RESULTS);
    query("q", "id_i1:[1 TO 5]", CommonParams.DEBUG, CommonParams.QUERY);
  }
}
